Attention系列二(代码篇)

Attention is all you need

代码都放在了github https://github.com/xuanzebi/Attention ,之后读论文如果遇到不错的其他attention机制也会继续更新。欢迎star~

attention

公式如下图,可以看到如果Q、K、V 都设置成对应一个文本的参数的话,就是self-attention。

Q K可以设置成对应不同词向量后的不同的参数,比如在问答中。比如aspect的情感分类Q可以设置成context,K则设置成aspect过embedding后的参数。具体可以看下方代码,通俗易懂。

1556183192324

1556183298559

代码 keras

苏神keras实现的版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class Attention(Layer):

def __init__(self, nb_head, size_per_head, mask_right=False, **kwargs):
self.nb_head = nb_head
self.size_per_head = size_per_head
self.output_dim = nb_head * size_per_head
self.mask_right = mask_right
super(Attention, self).__init__(**kwargs)

def build(self, input_shape):
self.WQ = self.add_weight(name='WQ',
shape=(input_shape[0][-1], self.output_dim),
initializer='glorot_uniform',
trainable=True)
self.WK = self.add_weight(name='WK',
shape=(input_shape[1][-1], self.output_dim),
initializer='glorot_uniform',
trainable=True)
# 这里一般 v_len = k_len
self.WV = self.add_weight(name='WV',
shape=(input_shape[2][-1], self.output_dim),
initializer='glorot_uniform',
trainable=True)
super(Attention, self).build(input_shape)

def Mask(self, inputs, seq_len, mode='mul'):
if seq_len == None:
return inputs
else:
mask = K.one_hot(seq_len[:, 0], K.shape(inputs)[1])
mask = 1 - K.cumsum(mask, 1)
for _ in range(len(inputs.shape) - 2):
mask = K.expand_dims(mask, 2)
if mode == 'mul':
return inputs * mask
if mode == 'add':
return inputs - (1 - mask) * 1e12

def call(self, x):
# 如果只传入Q_seq,K_seq,V_seq,那么就不做Mask
# 如果同时传入Q_seq,K_seq,V_seq,Q_len,V_len,那么对多余部分做Mask
if len(x) == 3:
Q_seq, K_seq, V_seq = x
Q_len, V_len = None, None
elif len(x) == 5:
Q_seq, K_seq, V_seq, Q_len, V_len = x
# 对Q、K、V做线性变换
Q_seq = K.dot(Q_seq, self.WQ)
Q_seq = K.reshape(Q_seq, (-1, K.shape(Q_seq)[1], self.nb_head, self.size_per_head))
Q_seq = K.permute_dimensions(Q_seq, (0, 2, 1, 3)) # nb_head ,q_len , size_per_head
K_seq = K.dot(K_seq, self.WK)
K_seq = K.reshape(K_seq, (-1, K.shape(K_seq)[1], self.nb_head, self.size_per_head))
K_seq = K.permute_dimensions(K_seq, (0, 2, 1, 3)) # nb_head ,k_len , size_per_head
V_seq = K.dot(V_seq, self.WV)
V_seq = K.reshape(V_seq, (-1, K.shape(V_seq)[1], self.nb_head, self.size_per_head))
V_seq = K.permute_dimensions(V_seq, (0, 2, 1, 3)) # nb_head ,v_len , size_per_head
# 计算内积,然后mask,然后softmax
A = K.batch_dot(Q_seq, K_seq, axes=[3, 3]) / self.size_per_head ** 0.5 # nb_head,q_len,k_len
A = K.permute_dimensions(A, (0, 3, 2, 1)) # nb_head,k_len,q_len
A = self.Mask(A, V_len, 'add')
A = K.permute_dimensions(A, (0, 3, 2, 1)) # nb_head ,q_len,k_len
if self.mask_right:
ones = K.ones_like(A[:1, :1])
mask = (ones - K.tf.matrix_band_part(ones, -1, 0)) * 1e12
A = A - mask
A = K.softmax(A)
# 输出并mask
O_seq = K.batch_dot(A, V_seq, axes=[3, 2]) # v_len = k_len nb_head , q_len , size_per_head
O_seq = K.permute_dimensions(O_seq, (0, 2, 1, 3)) # q_len , nb_head,size_per_head
O_seq = K.reshape(O_seq, (-1, K.shape(O_seq)[1], self.output_dim)) # q_len , nb_head*size_per_head
O_seq = self.Mask(O_seq, Q_len, 'mul')
return O_seq

def compute_output_shape(self, input_shape):
return (input_shape[0][0], input_shape[0][1], self.output_dim)

代码 tensorflow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def attention_layer(from_tensor,
to_tensor,
attention_mask=None,
num_attention_heads=1,
size_per_head=512,
query_act=None,
key_act=None,
value_act=None,
attention_probs_dropout_prob=0.0,
initializer_range=0.02,
do_return_2d_tensor=False,
batch_size=None,
from_seq_length=None,
to_seq_length=None):
"""Performs multi-headed attention from `from_tensor` to `to_tensor`.

This is an implementation of multi-headed attention based on "Attention
is all you Need". If `from_tensor` and `to_tensor` are the same, then
this is self-attention. Each timestep in `from_tensor` attends to the
corresponding sequence in `to_tensor`, and returns a fixed-with vector.

This function first projects `from_tensor` into a "query" tensor and
`to_tensor` into "key" and "value" tensors. These are (effectively) a list
of tensors of length `num_attention_heads`, where each tensor is of shape
[batch_size, seq_length, size_per_head].

Then, the query and key tensors are dot-producted and scaled. These are
softmaxed to obtain attention probabilities. The value tensors are then
interpolated by these probabilities, then concatenated back to a single
tensor and returned.

In practice, the multi-headed attention are done with transposes and
reshapes rather than actual separate tensors.

Args:
from_tensor: float Tensor of shape [batch_size, from_seq_length,
from_width].
to_tensor: float Tensor of shape [batch_size, to_seq_length, to_width].
attention_mask: (optional) int32 Tensor of shape [batch_size,
from_seq_length, to_seq_length]. The values should be 1 or 0. The
attention scores will effectively be set to -infinity for any positions in
the mask that are 0, and will be unchanged for positions that are 1.
num_attention_heads: int. Number of attention heads.
size_per_head: int. Size of each attention head.
query_act: (optional) Activation function for the query transform.
key_act: (optional) Activation function for the key transform.
value_act: (optional) Activation function for the value transform.
attention_probs_dropout_prob: (optional) float. Dropout probability of the
attention probabilities.
initializer_range: float. Range of the weight initializer.
do_return_2d_tensor: bool. If True, the output will be of shape [batch_size
* from_seq_length, num_attention_heads * size_per_head]. If False, the
output will be of shape [batch_size, from_seq_length, num_attention_heads
* size_per_head].
batch_size: (Optional) int. If the input is 2D, this might be the batch size
of the 3D version of the `from_tensor` and `to_tensor`.
from_seq_length: (Optional) If the input is 2D, this might be the seq length
of the 3D version of the `from_tensor`.
to_seq_length: (Optional) If the input is 2D, this might be the seq length
of the 3D version of the `to_tensor`.

Returns:
float Tensor of shape [batch_size, from_seq_length,
num_attention_heads * size_per_head]. (If `do_return_2d_tensor` is
true, this will be of shape [batch_size * from_seq_length,
num_attention_heads * size_per_head]).

Raises:
ValueError: Any of the arguments or tensor shapes are invalid.
"""

def transpose_for_scores(input_tensor, batch_size, num_attention_heads,
seq_length, width):
output_tensor = tf.reshape(
input_tensor, [batch_size, seq_length, num_attention_heads, width])

output_tensor = tf.transpose(output_tensor, [0, 2, 1, 3])
return output_tensor

from_shape = get_shape_list(from_tensor, expected_rank=[2, 3])
to_shape = get_shape_list(to_tensor, expected_rank=[2, 3])

if len(from_shape) != len(to_shape):
raise ValueError(
"The rank of `from_tensor` must match the rank of `to_tensor`.")

if len(from_shape) == 3:
batch_size = from_shape[0]
from_seq_length = from_shape[1]
to_seq_length = to_shape[1]
elif len(from_shape) == 2:
if (batch_size is None or from_seq_length is None or to_seq_length is None):
raise ValueError(
"When passing in rank 2 tensors to attention_layer, the values "
"for `batch_size`, `from_seq_length`, and `to_seq_length` "
"must all be specified.")

# Scalar dimensions referenced here:
# B = batch size (number of sequences)
# F = `from_tensor` sequence length
# T = `to_tensor` sequence length
# N = `num_attention_heads`
# H = `size_per_head`

from_tensor_2d = reshape_to_matrix(from_tensor)
to_tensor_2d = reshape_to_matrix(to_tensor)

# `query_layer` = [B*F, N*H]
query_layer = tf.layers.dense(
from_tensor_2d,
num_attention_heads * size_per_head,
activation=query_act,
name="query",
kernel_initializer=create_initializer(initializer_range))

# `key_layer` = [B*T, N*H]
key_layer = tf.layers.dense(
to_tensor_2d,
num_attention_heads * size_per_head,
activation=key_act,
name="key",
kernel_initializer=create_initializer(initializer_range))

# `value_layer` = [B*T, N*H]
value_layer = tf.layers.dense(
to_tensor_2d,
num_attention_heads * size_per_head,
activation=value_act,
name="value",
kernel_initializer=create_initializer(initializer_range))

# `query_layer` = [B, N, F, H]
query_layer = transpose_for_scores(query_layer, batch_size,
num_attention_heads, from_seq_length,
size_per_head)

# `key_layer` = [B, N, T, H]
key_layer = transpose_for_scores(key_layer, batch_size, num_attention_heads,
to_seq_length, size_per_head)

# Take the dot product between "query" and "key" to get the raw
# attention scores.
# `attention_scores` = [B, N, F, T]
attention_scores = tf.matmul(query_layer, key_layer, transpose_b=True)
attention_scores = tf.multiply(attention_scores,
1.0 / math.sqrt(float(size_per_head)))

if attention_mask is not None:
# `attention_mask` = [B, 1, F, T]
attention_mask = tf.expand_dims(attention_mask, axis=[1])

# Since attention_mask is 1.0 for positions we want to attend and 0.0 for
# masked positions, this operation will create a tensor which is 0.0 for
# positions we want to attend and -10000.0 for masked positions.
adder = (1.0 - tf.cast(attention_mask, tf.float32)) * -10000.0

# Since we are adding it to the raw scores before the softmax, this is
# effectively the same as removing these entirely.
attention_scores += adder

# Normalize the attention scores to probabilities.
# `attention_probs` = [B, N, F, T]
attention_probs = tf.nn.softmax(attention_scores)

# This is actually dropping out entire tokens to attend to, which might
# seem a bit unusual, but is taken from the original Transformer paper.
attention_probs = dropout(attention_probs, attention_probs_dropout_prob)

# `value_layer` = [B, T, N, H]
value_layer = tf.reshape(
value_layer,
[batch_size, to_seq_length, num_attention_heads, size_per_head])

# `value_layer` = [B, N, T, H]
value_layer = tf.transpose(value_layer, [0, 2, 1, 3])

# `context_layer` = [B, N, F, H]
context_layer = tf.matmul(attention_probs, value_layer)

# `context_layer` = [B, F, N, H]
context_layer = tf.transpose(context_layer, [0, 2, 1, 3])

if do_return_2d_tensor:
# `context_layer` = [B*F, N*H]
context_layer = tf.reshape(
context_layer,
[batch_size * from_seq_length, num_attention_heads * size_per_head])
else:
# `context_layer` = [B, F, N*H]
context_layer = tf.reshape(
context_layer,
[batch_size, from_seq_length, num_attention_heads * size_per_head])

return context_layer

transformer 网络结构代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def transformer_model(input_tensor,
attention_mask=None,
hidden_size=768,
num_hidden_layers=12,
num_attention_heads=12,
intermediate_size=3072,
intermediate_act_fn=gelu,
hidden_dropout_prob=0.1,
attention_probs_dropout_prob=0.1,
initializer_range=0.02,
do_return_all_layers=False):
"""Multi-headed, multi-layer Transformer from "Attention is All You Need".

This is almost an exact implementation of the original Transformer encoder.

See the original paper:
https://arxiv.org/abs/1706.03762

Also see:
https://github.com/tensorflow/tensor2tensor/blob/master/tensor2tensor/models/transformer.py

Args:
input_tensor: float Tensor of shape [batch_size, seq_length, hidden_size].
attention_mask: (optional) int32 Tensor of shape [batch_size, seq_length,
seq_length], with 1 for positions that can be attended to and 0 in
positions that should not be.
hidden_size: int. Hidden size of the Transformer.
num_hidden_layers: int. Number of layers (blocks) in the Transformer.
num_attention_heads: int. Number of attention heads in the Transformer.
intermediate_size: int. The size of the "intermediate" (a.k.a., feed
forward) layer.
intermediate_act_fn: function. The non-linear activation function to apply
to the output of the intermediate/feed-forward layer.
hidden_dropout_prob: float. Dropout probability for the hidden layers.
attention_probs_dropout_prob: float. Dropout probability of the attention
probabilities.
initializer_range: float. Range of the initializer (stddev of truncated
normal).
do_return_all_layers: Whether to also return all layers or just the final
layer.

Returns:
float Tensor of shape [batch_size, seq_length, hidden_size], the final
hidden layer of the Transformer.

Raises:
ValueError: A Tensor shape or parameter is invalid.
"""
if hidden_size % num_attention_heads != 0:
raise ValueError(
"The hidden size (%d) is not a multiple of the number of attention "
"heads (%d)" % (hidden_size, num_attention_heads))

attention_head_size = int(hidden_size / num_attention_heads)
input_shape = get_shape_list(input_tensor, expected_rank=3)
batch_size = input_shape[0]
seq_length = input_shape[1]
input_width = input_shape[2]

# The Transformer performs sum residuals on all layers so the input needs
# to be the same as the hidden size.
if input_width != hidden_size:
raise ValueError("The width of the input tensor (%d) != hidden size (%d)" %
(input_width, hidden_size))

# We keep the representation as a 2D tensor to avoid re-shaping it back and
# forth from a 3D tensor to a 2D tensor. Re-shapes are normally free on
# the GPU/CPU but may not be free on the TPU, so we want to minimize them to
# help the optimizer.
prev_output = reshape_to_matrix(input_tensor)

all_layer_outputs = []
for layer_idx in range(num_hidden_layers):
with tf.variable_scope("layer_%d" % layer_idx):
layer_input = prev_output

with tf.variable_scope("attention"):
attention_heads = []
with tf.variable_scope("self"):
attention_head = attention_layer(
from_tensor=layer_input,
to_tensor=layer_input,
attention_mask=attention_mask,
num_attention_heads=num_attention_heads,
size_per_head=attention_head_size,
attention_probs_dropout_prob=attention_probs_dropout_prob,
initializer_range=initializer_range,
do_return_2d_tensor=True,
batch_size=batch_size,
from_seq_length=seq_length,
to_seq_length=seq_length)
attention_heads.append(attention_head)

attention_output = None
if len(attention_heads) == 1:
attention_output = attention_heads[0]
else:
# In the case where we have other sequences, we just concatenate
# them to the self-attention head before the projection.
attention_output = tf.concat(attention_heads, axis=-1)

# Run a linear projection of `hidden_size` then add a residual
# with `layer_input`.
with tf.variable_scope("output"):
attention_output = tf.layers.dense(
attention_output,
hidden_size,
kernel_initializer=create_initializer(initializer_range))
attention_output = dropout(attention_output, hidden_dropout_prob)
attention_output = layer_norm(attention_output + layer_input)

# The activation is only applied to the "intermediate" hidden layer.
with tf.variable_scope("intermediate"):
intermediate_output = tf.layers.dense(
attention_output,
intermediate_size,
activation=intermediate_act_fn,
kernel_initializer=create_initializer(initializer_range))

# Down-project back to `hidden_size` then add the residual.
with tf.variable_scope("output"):
layer_output = tf.layers.dense(
intermediate_output,
hidden_size,
kernel_initializer=create_initializer(initializer_range))
layer_output = dropout(layer_output, hidden_dropout_prob)
layer_output = layer_norm(layer_output + attention_output)
prev_output = layer_output
all_layer_outputs.append(layer_output)

if do_return_all_layers:
final_outputs = []
for layer_output in all_layer_outputs:
final_output = reshape_from_matrix(layer_output, input_shape)
final_outputs.append(final_output)
return final_outputs
else:
final_output = reshape_from_matrix(prev_output, input_shape)
return final_output

Multi-Head Attention (MHA) Intra-MHA Inter-MHA

最近看了一篇基于aspect的情感识别,其中的inter 和 Intra 多头attention不错。所以来写一下

论文实现代码

Attentional Encoder Network for Targeted Sentiment Classification

论文网络结构

1556183783851

其中的Intra-attention、Inter-attention实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# -*- coding: utf-8 -*-
# file: attention.py
# author: songyouwei <youwei0314@gmail.com>
# Copyright (C) 2018. All Rights Reserved.

import math
import torch
import torch.nn as nn
import torch.nn.functional as F


class Attention(nn.Module):
def __init__(self, embed_dim, hidden_dim=None, out_dim=None, n_head=1, score_function='dot_product', dropout=0):
''' Attention Mechanism
:param embed_dim:
:param hidden_dim:
:param out_dim:
:param n_head: num of head (Multi-Head Attention)
:param score_function: scaled_dot_product / mlp (concat) / bi_linear (general dot)
:return (?, q_len, out_dim,)
'''
super(Attention, self).__init__()
if hidden_dim is None:
hidden_dim = embed_dim // n_head
if out_dim is None:
out_dim = embed_dim
self.embed_dim = embed_dim
self.hidden_dim = hidden_dim
self.n_head = n_head
self.score_function = score_function
self.w_k = nn.Linear(embed_dim, n_head * hidden_dim)
self.w_q = nn.Linear(embed_dim, n_head * hidden_dim)
self.proj = nn.Linear(n_head * hidden_dim, out_dim)
self.dropout = nn.Dropout(dropout)
if score_function == 'mlp':
self.weight = nn.Parameter(torch.Tensor(hidden_dim * 2))
elif self.score_function == 'bi_linear':
self.weight = nn.Parameter(torch.Tensor(hidden_dim, hidden_dim))
else: # dot_product / scaled_dot_product
self.register_parameter('weight', None)
self.reset_parameters()

def reset_parameters(self):
stdv = 1. / math.sqrt(self.hidden_dim)
if self.weight is not None:
self.weight.data.uniform_(-stdv, stdv)

def forward(self, k, q):
if len(q.shape) == 2: # q_len missing
q = torch.unsqueeze(q, dim=1)
if len(k.shape) == 2: # k_len missing
k = torch.unsqueeze(k, dim=1)
mb_size = k.shape[0] # ?
k_len = k.shape[1]
q_len = q.shape[1]
# k: (?, k_len, embed_dim,)
# q: (?, q_len, embed_dim,)
# kx: (n_head*?, k_len, hidden_dim)
# qx: (n_head*?, q_len, hidden_dim)
# score: (n_head*?, q_len, k_len,)
# output: (?, q_len, out_dim,)
kx = self.w_k(k).view(mb_size, k_len, self.n_head, self.hidden_dim)
kx = kx.permute(2, 0, 1, 3).contiguous().view(-1, k_len, self.hidden_dim)
qx = self.w_q(q).view(mb_size, q_len, self.n_head, self.hidden_dim)
qx = qx.permute(2, 0, 1, 3).contiguous().view(-1, q_len, self.hidden_dim)
if self.score_function == 'dot_product':
kt = kx.permute(0, 2, 1) # (n_head*?, hidden_dim,k_len)
score = torch.bmm(qx, kt) # (n_head*? , q_len , k_len)
elif self.score_function == 'scaled_dot_product':
kt = kx.permute(0, 2, 1)
qkt = torch.bmm(qx, kt)
score = torch.div(qkt, math.sqrt(self.hidden_dim))
elif self.score_function == 'mlp':
# 使 q_len k_len拼接。
kxx = torch.unsqueeze(kx, dim=1).expand(-1, q_len, -1, -1)
qxx = torch.unsqueeze(qx, dim=2).expand(-1, -1, k_len, -1)
kq = torch.cat((kxx, qxx), dim=-1) # (n_head*?, q_len, k_len, hidden_dim*2)
# kq = torch.unsqueeze(kx, dim=1) + torch.unsqueeze(qx, dim=2)
score = F.tanh(torch.matmul(kq, self.weight)) # (n_head*? , q_len,k_len)
elif self.score_function == 'bi_linear':
qw = torch.matmul(qx, self.weight)
kt = kx.permute(0, 2, 1)
score = torch.bmm(qw, kt)
else:
raise RuntimeError('invalid score_function')
score = F.softmax(score, dim=-1)
output = torch.bmm(score, kx) # (n_head*?, q_len, hidden_dim)
output = torch.cat(torch.split(output, mb_size, dim=0), dim=-1) # (?, q_len, n_head*hidden_dim)
output = self.proj(output) # (?, q_len, out_dim)
output = self.dropout(output)
return output, score


class NoQueryAttention(Attention):
'''q is a parameter'''

def __init__(self, embed_dim, hidden_dim=None, out_dim=None, n_head=1, score_function='dot_product', q_len=1,
dropout=0):
super(NoQueryAttention, self).__init__(embed_dim, hidden_dim, out_dim, n_head, score_function, dropout)
self.q_len = q_len
self.q = nn.Parameter(torch.Tensor(q_len, embed_dim))
self.reset_q()

def reset_q(self):
stdv = 1. / math.sqrt(self.embed_dim)
self.q.data.uniform_(-stdv, stdv)

def forward(self, k, **kwargs):
mb_size = k.shape[0]
q = self.q.expand(mb_size, -1, -1)
return super(NoQueryAttention, self).forward(k, q)
-------------The End-------------